Amazon Timestream に シングルメジャーレコードで 100 件以上のセンサー値があるデータを書き込んでみる
背景
今回、Amazon Timestream を使うことがあり、「データ項目が100件以上あるデータをテーブルに書き込みたい」という要件がありました。その際の対応について分かったことを紹介したいと思います。
本記事の注意点
今回は従来からある「シングルメジャーレコード」による書き込みを想定しています。より効率的な書き込みが可能な「マルチメジャーレコード」については次回の記事でご紹介したいと思います。
簡単なスクリプトでデータを書き込んでみる
まずは、公式ドキュメントに掲載されているドキュメントを元に、SDK(boto3)を使って書き込みを行います。
ここでは、デバイスから送られてくるデータとして下記のようなものを想定しています。
{ "timestamp": "2021-04-28 01:14:49", "device_name": "host1", "az": "az1", "region": "us-east-1" "cpu_utilization": "67.93225830354002", "memory_utilization": "88.71518322520387" "proc_numbers": 91 }
サンプルコードは下記です。ドキュメントのものを少し修正して cpu_utilization
、memory_utilization
、proc_numbers
というデータ項目を書き込んでみます。
import boto3 from botocore.config import Config import time import random DatabaseName = 'mytestdb' TableName = 'mytesttbl' def current_milli_time(): return round(time.time() * 1000) session = boto3.Session() write_client = session.client('timestream-write', region_name='us-east-1', config=Config(read_timeout=20, # リクエストタイムアウト(秒) max_pool_connections=5000, # 最大接続数 retries={'max_attempts': 10})) # 最大試行回数 current_time = str(current_milli_time()) dimensions = [ {'Name': 'region', 'Value': 'us-east-1'}, {'Name': 'az', 'Value': 'az1'}, {'Name': 'hostname', 'Value': 'host1'} ] cpu_utilization = { 'Dimensions': dimensions, 'MeasureName': 'cpu_utilization', 'MeasureValue': str(random.uniform(1, 90)), 'MeasureValueType': 'DOUBLE', 'Time': current_time } memory_utilization = { 'Dimensions': dimensions, 'MeasureName': 'memory_utilization', 'MeasureValue': str(random.uniform(1, 90)), 'MeasureValueType': 'DOUBLE', 'Time': current_time } proc_numbers = { 'Dimensions': dimensions, 'MeasureName': 'proc_numbers', 'MeasureValue': str(random.randint(1, 200)), 'MeasureValueType': 'BIGINT', 'Time': current_time } records = [cpu_utilization, memory_utilization, proc_numbers] result = write_client.write_records(DatabaseName=DatabaseName, TableName=TableName, Records=records, CommonAttributes={})
コンソールで確認すると、下記のように同じタイムスタンプでcpu_utilization
、memory_utilization
、proc_numbers
の3項目が3レコードで格納されていることが分かります。
データ項目が101件以上の場合
先程のサンプルコードで Amazon Timestream にどのようにデータが格納されるか基本的なことを確認できました。サンプルコードでは 3件のデータ項目でしたが下記のように項目が 101 個以上ある場合はどうなるでしょうか?
{ "timestamp": "2021-04-28 01:14:49", "device_name": "hoge_device", "data_1": 13.453, "data_2": 20.6124, . (中略) . "data_99": 11.723, "data_100": 81.153, "data_101": 12.267, "data_101": 29.694, . (中略) . "data_199": 26.632, "data_200": 91.124 }
Amazon Timestream の仕様では、「一度のリクエストで書き込めるのは最大100行まで」という制限があるので、シングルメジャーレコードで 100 件以上のを書き込みたい場合は、複数回に分けて書く必要があります。
Records per WriteRecords API request
- The maximum number of records in a WriteRecords API request. : 100
気になったこと
さて、サービスの仕様から「101 件以上のレコードを書き込みたい」場合は 2 回以上に分けて書き込めばいいことは分かりました。このとき次のようなことが気になりました。
「100件書き込みを行う場合」と「200件書き込みを行う場合」で、書き込み時間が倍にならないか?
検証してみた
調べてもよく分からなかったので、実際に書き込みを行って簡単な検証をしてみることにしました。
200 件書き込みするサンプルコード
先程のコードを元に下記のような検証スクリプト(write_single_measure.py
)を用意しました。
import boto3 from botocore.config import Config import time import random session = boto3.Session() write_client = session.client('timestream-write', region_name='us-east-1', config=Config(read_timeout=20, # リクエストタイムアウト(秒) max_pool_connections=5000, # 最大接続数 retries={'max_attempts': 10})) # 最大試行回数 DatabaseName='mytestdb' TableName='mytesttbl' def current_milli_time(): return round(time.time() * 1000) def gen_dimensions(): municipalities = random.choice(['Shinjuku', 'Toshima', 'Nakano', 'Ota', 'Chiyoda']) gateway_id = random.choice(['gateway_1', 'gateway_2', 'gateway_3']) device_name = str(municipalities) + '_' + str(gateway_id) dimensions = [ {'Name': 'Location', 'Value': 'Tokyo'}, {'Name': 'Municipalities', 'Value': municipalities}, {'Name': 'DeviceName', 'Value': device_name} ] return dimensions def gen_dummy_measure(): records_X = [] for measure_num in range(0,100): # 何件のレコードを作成するか ;100件 (0,100) , 最大100 MeasureValue = str(random.uniform(1, 90)) dummy_measure_name = 'data_' + str(measure_num) dummy_measure = { 'Dimensions': gen_dimensions(), 'MeasureName': dummy_measure_name, 'MeasureValue': MeasureValue, 'MeasureValueType': 'DOUBLE', 'Time': current_time } records_X.append(dummy_measure) return records_X for write_num in range(0,2): # 生成したレコードを何回書き込むか current_time = str(current_milli_time()) print ("start write_records...: " + str(write_num)) result = write_client.write_records(DatabaseName=DatabaseName, TableName=TableName, Records=gen_dummy_measure(), CommonAttributes={})
想定する環境は下記のとおりです。
- 都内にある IoT ゲートウェイからのデータを想定
- 一度に送られてくるデータ項目は 100 件以上
上記のコードでは、31 行目で 100 件のデータ(レコード)を作成しています。また 44 行目で 100 レコードの作成を 2 回行い、100 件ずつ 2 回に分けてデータを書き込みを行います。これにより「 100 件以上( 200 件 )のセンサーデータを書き込む」状況を再現しています。
AWS Cloudshell で書き込み
Amazon Timestream はまだ東京リージョンにないので「N.Virginia」リージョンを使います。事前に適当なデータベース、テーブルを作っておきます。
また、今回は同じ「N.Virginia」リージョンの AWS Cloudshell を使って先程のコードを実行しようと思うので、その準備を行います。(手元の PC から書き込むと距離的なレイテンシが発生するため)
AWS Cloudshell を開いて boto3 をアップデートしておきます。
$ curl -kL https://bootstrap.pypa.io/get-pip.py | python3 $ pip install -U boto3
アップデート後のバージョンは下記の通りです。
$ pip show boto3 Name: boto3 Version: 1.20.48 Summary: The AWS SDK for Python Home-page: https://github.com/boto/boto3 Author: Amazon Web Services Author-email: License: Apache License 2.0 Location: /home/cloudshell-user/.local/lib/python3.7/site-packages Requires: botocore, jmespath, s3transfer Required-by: aws-sam-cli, aws-sam-translator, serverlessrepo
次に、先程のコードを適当な名前(write_single_measure.py
)を付けて AWS Cloudshell に保存します。ファイルをアップロードする方法や、vi
等で直接書き込むなど好きな形で対応してください。
( vi
でコピペするとインデントがずれるので、コピペする前にコマンドモードで:set paste
を実行しておきます。)
コードを実行してみる
単純にスクリプトを実行するだけではつまらないので、下記のパターンで書き込みを試してみたいと思います。
書き込むレコード数を変えて、それぞれのリクエストのレイテンシがどのように変わるか確認してみます。
- ① 2回のリクエストで 100 行ずつのレコードを書き込む処理を 10 回繰り返す。
- これを 1 分間隔で 30 分間実行する
- ② 1回のリクエストで 100 行のレコードを書き込む処理を 10 回繰り返す。
- これを 1 分間隔で 30 分間実行する
最初に①を行います。下記のシェルスクリプト(write_record.sh
)を使って30分間の書き込みを行うので、 これも AWS Cloudshell に保存しておきます。
#!/bin/bash for count in {1..30} do # write single measure record for i in {1..10} do python3 write_single_measure.py done echo "sleep 60..." sleep 60 done
準備ができたらシェルスクリプトを実行して 30 分待機します。
$ sh write_record.sh
次に先程のサンプルコード(write_single_measure.py
)を下記の様に修正して②を実行します。
for write_num in range(0,2): ↓ for write_num in range(0,1):
修正できたら再度シェルスクリプトを実行します。
$ sh write_record.sh
CloudWatch で確認してみる
CloudWatch では「SuccessfulRequestLatency」というメトリクスを確認してみます。
下記はそのグラフです。今回はスクリプトの実行に 30分かかるので、時間短縮のために別データベースに同時に書き込みを行いました。それぞれの色の意味は下記です。
- オレンジ色:「① 2回のリクエストで 100 レコードずつ書き込む処理を10回」行ったグラフ
- 青色:「② 1回のリクエストで 100 レコードを書き込む処理を10回」行ったグラフ
また、上側のグラフはプロットする間隔を「1分」にしたもので、下側は「5分」にしたものになります。(統計はいずれも「平均」です)
次のグラフは初回書き込み以降の部分だけ抽出したものです。
テーブルを作り直したり、一晩何もしないで翌朝同じ書き込みを試すなど、何度か同じことを繰り返してみた所、下記のような動きになりました。
- 初回書き込み時は
SuccessfulRequestLatency
の値が高い - 2回目以降は
SuccessfulRequestLatency
の値が徐々に下がり安定した - 200 レコード書き込む方が
SuccessfulRequestLatency
の値が高いが最大でも 5ms 程度の違い
今回検証したボリュームでは、ほとんど書き込み速度などに違いは見られませんでしたが、書き込むクライアントの実装やボリューム次第で結果は変わるかと思いますので、ご利用される際は事前に自分のユースケースに沿って検証されるのがよいかと思います。
1回のリクエストで101件以上のレコード書き込みをしてみる
仕様として「1回のリクエストで101件以上のレコード書き込み」はエラーになりますが、実際に試してみました。200 レコードを書き込んだサンプルコードの31 行目を for measure_num in range(0,101)
として保存します。
修正したスクリプトを実行してみると下記のエラーが返ってきました。仕様どおり「1回のリクエストで書き込めるレコード数の最大値は 100」であることが分かりました。
(見やすさのため改行しています。)
version=null, measureValues=null)]' at 'records' failed to satisfy constraint: Member must have length less than or equal to 100
最後に
今回は、従来からある「シングルメジャーレコード」として書き込みを行いました。これは「1レコードに1つのデータ項目」を書く方法になります。そのため、今回のように項目が多い場合その分レコード数も増える形となります。
これを改善する方法として、2021年11月29日リリースされた「マルチメジャーレコード」を使う方法があります。マルチメジャーレコードを使うと「複数のデータ項目を1つのレコード」として書き込めるようになり、コスト効率も上がります。
次回は、「マルチメジャーレコード」による書き込みを試してみたいと思います。